This notebook is part of PyBroMo a python-based single-molecule Brownian motion diffusion simulator that simulates confocal smFRET experiments.
In [ ]:
from pathlib import Path
from textwrap import dedent, indent
import numpy as np
import tables
from scipy.stats import expon
import phconvert as phc
print('phconvert version:', phc.__version__)
In [ ]:
SIM_PATH = 'data/'
In [ ]:
filelist = list(Path(SIM_PATH).glob('smFRET_*_600s.hdf5'))
[f.name for f in filelist]
In [ ]:
filename_a = str([f for f in filelist if '11_E_40_Em' in f.name][0])
filename_b = str([f for f in filelist if '11_E_75_Em' in f.name][0])
filename_a, filename_b
In [ ]:
da = tables.open_file(filename_a)
db = tables.open_file(filename_b)
In [ ]:
da.filename
In [ ]:
db.filename
In [ ]:
print(da.root.description.read().decode())
In [ ]:
print(db.root.description.read().decode())
In [ ]:
# Make sure files a re using the same trajectories
assert da.root.description.read().decode().split('\n')[5] == db.root.description.read().decode().split('\n')[5]
In [ ]:
# Timestamps
times_a = da.root.photon_data.timestamps.read()
times_b = db.root.photon_data.timestamps.read()
# Detectors
det_a = da.root.photon_data.detectors.read()
det_b = db.root.photon_data.detectors.read()
# Particle number for each timestamp
par_a = da.root.photon_data.particles.read()
par_b = db.root.photon_data.particles.read()
In [ ]:
par_a
In [ ]:
acquisition_duration = da.root.acquisition_duration.read()
assert acquisition_duration == db.root.acquisition_duration.read()
print('Acquisition duration: %d s' % acquisition_duration)
In [ ]:
times_unit = da.root.photon_data.timestamps_specs.timestamps_unit.read()
times_unit
In [ ]:
tau_a = 1e-3 / 5
tau_b = 0.5e-3 / 5
tau_s = [tau_a, tau_b]
In [ ]:
expon_s = tuple(expon(scale=tau / times_unit) for tau in tau_s)
In [ ]:
n = int(1.1 * acquisition_duration / (tau_a + tau_b)) # Number of transitions (upper limit)
In [ ]:
def sim_two_states_single_particle(times_s, taus_s):
"""Simulate 2-state transitions for a single particle.
Arguments:
times_s (tuple or arrays): 2-tuple of timestamps arrays
for the two states a (times_s[0]) and b (times_s[1]).
taus_s (tuple or arrays): 2-tuple of residence times arrays
for the two states a (taus_s[0]) and b (taus_s[1]).
Returns:
List of index pairs. Each pair is a start/stop index for
for the timestamps of current state for a specific residence time.
The states are strictly alternating starting from 0 (i.e. a).
- first pair: (state = 0) start/stop index for array `times_s[0]` (where 0 = state)
corresponding to the residence time `taus_s[state][i_residence_time] = taus_s[0][0]`
- second pair: (state = 1) start/stop index for array `times_s[1]` (where 1 = state)
corresponding to the residence time `taus_s[state][i_residence_time] = taus_s[1][0]`
- third pair: (state = 0) start/stop index for array `times_s[0]` (where 0 = state)
corresponding to the residence time `taus_s[state][i_residence_time] = taus_s[0][1]`
and so on.
"""
slices_list = []
index_s = [0, 0] # indexes for looping thorugh the timestamps arrays
index_start_s = [0, 0] # indexes of current state start in each timestamps array
index_tau_s = [0, 0] # index of current time window duration
t_start = 0 # time of current state start
state, otherstate = 0, 1
while ((index_s[0] < len(times_s[0]) - 1) and
(index_s[1] < len(times_s[1]) - 1)):
# Duration of current time window (i.e. duration of current state)
tau = taus_s[state][index_tau_s[state]]
# Find timestamps in current time window
# for both timestamps arrays
for state_i in (0, 1):
times = times_s[state_i]
delta_t = times[index_s[state_i]] - t_start
while delta_t < tau and index_s[state_i] < len(times) - 1:
index_s[state_i] += 1
delta_t = times[index_s[state_i]] - t_start
#print(state, index_s[state])
# Save the timestamps only for current state
slices_list.append((index_start_s[state], index_s[state]))
# Save index of first timestamp in next time window
index_start_s = index_s.copy()
# Discard current "used" tau
index_tau_s[state] += 1
# Switch to a new state
t_start += tau
state, otherstate = otherstate, state
return slices_list
In [ ]:
def sim_two_states(num_particles, times_states, det_states, par_states, times_unit, expon_s, seed=1):
"""Simulate 2-state transitions for a set of particles.
Arguments:
num_particles (int): number of simulated particles.
times_states (tuple of arrays): 2-tuple of timestamps arrays, one for each state
det_states (tuple of arrays): 2-tuple of detectors arrays, one for each state
par_states (tuple of arrays): 2-tuple of particles arrays, one for each state
times_unit (float): timestamps unit in seconds.
expon_s (tuple of scipy.stats distributions): 2-tuple of exponential distributions
used to simulate residency times for each state. Each element is a frozen
`scipy.stats.expon` distribution with scale parameter set according to the
residency time for the corresponding state.
Returns:
Tuple of 2 lists:
- List of timestamps arrays, one for each particle, after 2-states dynamics simulation.
- List of detectors arrays, one for each particle, after 2-states dynamics simulation.
"""
np.random.seed(seed)
times_p = []
det_p = []
for particle in range(num_particles):
print('\n- Simulating particle %d: ' % particle, end='', flush=True)
# Get timestamps and detectors for current particle in each state
print('timestamps..', end='', flush=True)
masks_states = [par == particle for par in par_states]
times_s = [memoryview(t_par[mask_par]) for t_par, mask_par in zip(times_states, masks_states)]
det_s = [memoryview(det_par[mask_par]) for det_par, mask_par in zip(det_states, masks_states)]
print('[done] ', end='', flush=True)
# Simulate residence times
print('residence..', end='', flush=True)
taus_s = [memoryview(exp_dist.rvs(n)) for exp_dist in expon_s]
sim_duration = np.sum(np.sum(taus) for taus in taus_s) * times_unit
assert sim_duration > acquisition_duration
print('[done] ', end='', flush=True)
# Compute start/stop indexes for the timestamps for each residence time
print('transition-index..', end='', flush=True)
slices_list = sim_two_states_single_particle(times_s, taus_s)
print('[done] ', end='', flush=True)
# Create new timestamps and detectors to store dynamics simulation results
print('merge..', end='', flush=True)
times_size = sum([s[1] - s[0] for s in slices_list])
times = np.zeros(times_size, dtype='int64')
det = np.zeros(times_size, dtype='uint8')
par = np.ones(times_size, dtype='uint8') * particle
times_m = memoryview(times)
det_m = memoryview(det)
# istart, istop are indexes of times_m/det_m while the
# start, stop indexes in slices_list refer to `times_s[state]`
# where state = 0 for odd elements and state = 1 for even elements.
# See `sim_two_states_single_particle()` for more info on `slice_list`.
istart = 0
state, otherstate = 0, 1
for start, stop in slices_list:
istop = istart + stop - start
times_m[istart:istop] = times_s[state][start:stop]
det_m[istart:istop] = det_s[state][start:stop]
istart = istop
state, otherstate = otherstate, state
print('[done]', flush=True)
assert (times != 0).all()
times_p.append(times)
det_p.append(det)
return times_p, det_p
In [ ]:
seed = 987123654 # random number generator seed
times_p, det_p = sim_two_states(35, (times_a, times_b), (det_a, det_b), (par_a, par_b),
times_unit=times_unit, expon_s=expon_s, seed=seed)
In [ ]:
assert all(all(np.diff(t) >= 0) for t in times_p)
assert len(times_p) == len(det_p) == 35
In [ ]:
det_p[0][:10]
In [ ]:
det_p[1][:10]
In [ ]:
times_p[0][:10]
In [ ]:
times_p[1][:10]
In [ ]:
times_a[par_a == 35]
In [ ]:
det_a[par_a == 35]
In [ ]:
times_p.append(times_a[par_a == 35])
det_p.append(det_a[par_a == 35])
In [ ]:
times_dyn = np.hstack(times_p)
det_dyn = np.hstack(det_p)
In [ ]:
argsort = times_dyn.argsort(kind='mergesort')
times_dyn = times_dyn[argsort]
det_dyn = det_dyn[argsort]
det_dyn
In [ ]:
par_dyn = np.hstack([det_p_i.size * [idx] for idx, det_p_i in enumerate(det_p)])
assert par_dyn.shape[0] == sum(d.size for d in det_p)
par_dyn = par_dyn[argsort]
In [ ]:
def make_photon_hdf5(times, det, par, times_unit, description, identity=None):
photon_data = dict(
timestamps = times,
timestamps_specs = dict(timestamps_unit=times_unit),
detectors = det,
particles = par,
measurement_specs = dict(
measurement_type = 'smFRET',
detectors_specs = dict(spectral_ch1 = np.atleast_1d(0),
spectral_ch2 = np.atleast_1d(1))))
setup = dict(
num_pixels = 2,
num_spots = 1,
num_spectral_ch = 2,
num_polarization_ch = 1,
num_split_ch = 1,
modulated_excitation = False,
lifetime = False,
excitation_alternated=(False,),
excitation_cw=(True,))
provenance = dict(software='', software_version='', filename='')
if identity is None:
identity = dict()
description = description
acquisition_duration = np.round((times[-1] - times[0]) * times_unit)
data = dict(
acquisition_duration = round(acquisition_duration),
description = description,
photon_data = photon_data,
setup=setup,
provenance=provenance,
identity=identity)
return data
In [ ]:
da.filename
In [ ]:
db.filename
In [ ]:
traj_descr = dedent('\n'.join(da.root.description.read().decode().split('\n')[4:7]))
print(traj_descr)
In [ ]:
part_D_descr = indent(dedent('\n'.join(da.root.description.read().decode().split('\n')[9:11])), ' ')
print(part_D_descr)
In [ ]:
state0_descr = indent(dedent('\n'.join(da.root.description.read().decode().split('\n')[11:13])), ' ')
print(state0_descr)
In [ ]:
state1_descr = indent(dedent('\n'.join(db.root.description.read().decode().split('\n')[11:13])), ' ')
print(state1_descr)
In [ ]:
bg_descr = dedent('\n'.join(da.root.description.read().decode().split('\n')[-4:]))
print(bg_descr)
In [ ]:
tau_a_ms = tau_a * 1e3
tau_b_ms = tau_b * 1e3
tau_a_us = tau_a * 1e6
tau_b_us = tau_b * 1e6
In [ ]:
filename_a_name = Path(filename_a).name
filename_b_name = Path(filename_b).name
In [ ]:
description = f"""\
PyBroMo simulation of 2-states dynamics
----------------------------------------
{traj_descr}
{part_D_descr}
State 0:
Residency time: {tau_a_ms} ms
{state0_descr}
filename: {filename_a_name}
State 1:
Residency time: {tau_b_ms} ms
{state0_descr}
filename: {filename_b_name}
{bg_descr}
"""
print(description)
In [ ]:
identity=dict(author='Antonino Ingargiola',
author_affiliation='UCLA')
In [ ]:
data = make_photon_hdf5(times_dyn, det_dyn, par_dyn, times_unit, description, identity=identity)
data
In [ ]:
h5_fname = f'smFRET_0eb9b3_P_35_s0_D_2.5e-11_dynamics_E_40-75_Tau_{tau_a_us:.0f}-{tau_b_us:.0f}us_EmTot_226k-200k_BgD900_BgA600_t_max_600s.hdf5'
h5_fname
In [ ]:
phc.hdf5.save_photon_hdf5(data, h5_fname=h5_fname, overwrite=True)